/**
 * Copyright 2010 The Apache Software Foundation
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hbase.zookeeper;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.zookeeper.client.KeeperException;

/**
 * Tracks the availability and value of a single ZooKeeper node.
 *
 * <p>Utilizes the {@link ZooKeeperListener} interface to get the necessary
 * ZooKeeper events related to the node.
 *
 * <p>This is the base class used by trackers in both the Master and
 * RegionServers.
 */
public abstract class ZooKeeperNodeTracker extends ZooKeeperListener {

	static final Log LOG = LogFactory.getLog(ZooKeeperNodeTracker.class);
	/** Path of node being tracked */
	protected final String node;

	/** Data of the node being tracked */
	private byte[] data;

	/** Used to abort if a fatal error occurs */
	protected final Abortable abortable;

	private boolean stopped = false;

	/**
	 * Constructs a new ZK node tracker.
	 *
	 * <p>After construction, use {@link #start} to kick off tracking.
	 *
	 * @param watcher
	 * @param node
	 * @param abortable
	 */
	public ZooKeeperNodeTracker(ZooKeeperWatcher watcher, String node, Abortable abortable) {
		super(watcher);
		this.node = node;
		this.abortable = abortable;
		this.data = null;
	}

	/**
	 * Starts the tracking of the node in ZooKeeper.
	 *
	 * <p>Use {@link #blockUntilAvailable()} to block until the node is available
	 * or {@link #getData(boolean)} to get the data of the node if it is available.
	 */
	public synchronized void start() {
		try {
			ZKUtil.waitForZKConnectionIfAuthenticating(watcher);
		} catch (InterruptedException e) {
			throw new IllegalStateException("ZookeeperNodeTracker on " + this.node
					+ " interuppted while waiting for SASL Authentication", e);
		}
		this.watcher.registerListener(this);
		try {
			if (ZKUtil.watchAndCheckExists(watcher, node)) {
				byte[] data = ZKUtil.getDataAndWatch(watcher, node);
				if (data != null) {
					this.data = data;
				} else {
					// It existed but now does not, try again to ensure a watch is set
					LOG.debug("Try starting again because there is no data from " + node);
					start();
				}
			}
		} catch (KeeperException e) {
			abortable.abort("Unexpected exception during initialization, aborting", e);
		}
	}

	public synchronized void stop() {
		this.stopped = true;
		notifyAll();
	}

	/**
	 * Gets the data of the node, blocking until the node is available.
	 *
	 * @return data of the node
	 * @throws InterruptedException if the waiting thread is interrupted
	 */
	public synchronized byte[] blockUntilAvailable() throws InterruptedException {
		return blockUntilAvailable(0, false);
	}

	/**
	 * Gets the data of the node, blocking until the node is available or the
	 * specified timeout has elapsed.
	 *
	 * @param timeout maximum time to wait for the node data to be available,
	 * n milliseconds.  Pass 0 for no timeout.
	 * @return data of the node
	 * @throws InterruptedException if the waiting thread is interrupted
	 */
	public synchronized byte[] blockUntilAvailable(long timeout, boolean refresh) throws InterruptedException {
		if (timeout < 0)
			throw new IllegalArgumentException();
		boolean notimeout = timeout == 0;
		long startTime = System.currentTimeMillis();
		long remaining = timeout;
		if (refresh) {
			try {
				this.data = ZKUtil.getDataAndWatch(watcher, node);
			} catch (KeeperException e) {
				abortable.abort("Unexpected exception handling blockUntilAvailable", e);
			}
		}
		while (!this.stopped && (notimeout || remaining > 0) && this.data == null) {
			// We expect a notification; but we wait with a
			//  a timeout to lower the impact of a race condition if any
			wait(100);
			remaining = timeout - (System.currentTimeMillis() - startTime);
		}
		return this.data;
	}

	/**
	 * Gets the data of the node.
	 *
	 * <p>If the node is currently available, the most up-to-date known version of
	 * the data is returned.  If the node is not currently available, null is
	 * returned.
	 * @param refresh whether to refresh the data by calling ZK directly.
	 * @return data of the node, null if unavailable
	 */
	public synchronized byte[] getData(boolean refresh) {
		if (refresh) {
			try {
				this.data = ZKUtil.getDataAndWatch(watcher, node);
			} catch (KeeperException e) {
				abortable.abort("Unexpected exception handling getData", e);
			}
		}
		return this.data;
	}

	public String getNode() {
		return this.node;
	}

	@Override
	public synchronized void nodeCreated(String path) {
		if (!path.equals(node))
			return;
		try {
			byte[] data = ZKUtil.getDataAndWatch(watcher, node);
			if (data != null) {
				this.data = data;
				notifyAll();
			} else {
				nodeDeleted(path);
			}
		} catch (KeeperException e) {
			abortable.abort("Unexpected exception handling nodeCreated event", e);
		}
	}

	@Override
	public synchronized void nodeDeleted(String path) {
		if (path.equals(node)) {
			try {
				if (ZKUtil.watchAndCheckExists(watcher, node)) {
					nodeCreated(path);
				} else {
					this.data = null;
				}
			} catch (KeeperException e) {
				abortable.abort("Unexpected exception handling nodeDeleted event", e);
			}
		}
	}

	@Override
	public synchronized void nodeDataChanged(String path) {
		if (path.equals(node)) {
			nodeCreated(path);
		}
	}

	/**
	 * Checks if the baseznode set as per the property 'zookeeper.znode.parent'
	 * exists.
	 * @return true if baseznode exists.
	 *         false if doesnot exists.
	 */
	public boolean checkIfBaseNodeAvailable() {
		try {
			if (ZKUtil.checkExists(watcher, watcher.baseZNode) == -1) {
				return false;
			}
		} catch (KeeperException e) {
			abortable.abort("Exception while checking if basenode exists.", e);
		}
		return true;
	}
}
